package com.atguigu.flink.sql;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Created by Smexy on 2023/2/5
 */
public class Demo7_WriteKafkaAgg2
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<WaterSensor> ds = env
            .socketTextStream("hadoop103", 8888)
            .map(new WaterSensorMapFunction());
        Table table = tableEnvironment.fromDataStream(ds);

        //为表起名
        tableEnvironment.createTemporaryView("t2",table);

        /*
            Exception in thread "main" org.apache.flink.table.api.ValidationException:
                Flink doesn't support ENFORCED mode for PRIMARY KEY constraint.

                flink不支持 ENFORCED mode(强制模式)的主键约束。
                        强制模式指，一旦有了主键约束。在写入每一条数据时，应该使用主键约束检测数据是否已经冲突，如果冲突，禁止写入。
                        Kafka本质是一个消息系统，无法在检测冲突时，禁止写入的！

                NOT ENFORCED： 非强制模式。

         */
        String createTableSql = "create table t1( id string , ts bigint ,sumVc double , primary key(id,ts) NOT ENFORCED ) with(" +
            "                    'connector' = 'upsert-kafka' ," +
            "                    'topic' = 'topicE' , " +
            "                    'properties.bootstrap.servers' = 'hadoop102:9092' , " +
            "                    'value.format' = 'json'  ," +
            "                    'key.format' = 'json'  " +
            "                    )";

        //执行建表(写操作)
        tableEnvironment.executeSql(createTableSql);


        tableEnvironment.executeSql("insert into t1 select id,ts,sum(vc) a from t2 group by id,ts");

    }
}
